home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.4) ''' SpamExperts State engine. ''' import os import sys import time import email import types import errno import Queue import socket import _winreg import threading import xmlrpclib import email.Utils as email import email.Header as email try: import POW except ImportError: POW = None from spambayes.FileCorpus import FileCorpus from spambayes.storage import SpamTrainer, HamTrainer, Trainer from spambayes.message import open_storage as open_message_storage from spambayes.storage import ensureDir, open_storage, NO_TRAINING_FLAG from spamexperts.Options import options verbose = options[('globals', 'verbose')] options[('globals', 'verbose')] = False from spambayes.scripts.sb_server import close_platform_mutex options[('globals', 'verbose')] = verbose del verbose from spamexperts import se_stats from spamexperts import resources from spamexperts import LSPControl from spamexperts import se_logging from spamexperts import software_update from se_config import spamexpertsConfig from spamexperts import addressclassifier from spamexperts.fingerprint import fingerprint from spamexperts.message import SEHeaderMessage from spamexperts.Options import get_pathname_option from spamexperts.storage import TransitMessagesZODB from spamexperts.ProxyClassifier import ProxyClassifier from spamexperts.OptionsClass import ON, OFF, BLOCK_SPAM from spamexperts.FileCorpus import CarefulExpiryFileCorpus from spamexperts.FileCorpus import SEGzipFileMessageFactory from spamexperts.OptionsClass import BLOCKED, DELAYED, REMOVED from spamexperts.OptionsClass import IS_HAM, IS_SPAM, IS_UNSURE from spamexperts.FileCorpus import SEFileMessage, SEFileMessageFactory class State(object): def __init__(self): '''Initialises the State object that holds the state of the app. The default settings are read from Options.py and bayescustomize.ini and are then overridden by the command-line processing code in the __main__ code below.''' self.logFile = None self.imapLogFile = None self.bayes = None self.platform_mutex = None self.fingerprint = fingerprint.Fingerprint() self.fingerprintclient = None self.versioncontrol = software_update.VersionControl() self.lspcontrol = LSPControl.LSPControl() self.proxyListeners = [] self.proxies = [] self.open_remote_connections = [] self.reclassifier = ProxyClassifier() ProxyClassifier.state = self ProxyClassifier.init() self.gzipCache = options[('Storage', 'cache_use_gzip')] self.cacheExpiryDays = options[('Storage', 'cache_expiry_days')] self.runTestServer = False def init(self): if not hasattr(self, 'model_notifier'): raise AssertionError, 'The model needs to tell us about the notifier before init().' self.checkLSPStatus() SP = xmlrpclib.ServerProxy zope_user = 'home' zope_pass = '11073' address = 'http://%s:%s@%s' % (zope_user, zope_pass, options[('fingerprint', 'zopeserver')]) self.fingerprintclient = SP(address) logfile = os.path.join(se_logging.log_dir, '_pop3proxy.log') self.logFile = open(logfile, 'wb', 0) if options[('globals', 'verbose')]: print >>sys.stderr, 'Opened', logfile logfile = os.path.join(se_logging.log_dir, '_imap4proxy.log') self.imapLogFile = open(logfile, 'wb', 0) if options[('globals', 'verbose')]: print >>sys.stderr, 'Opened', logfile self.totalSessions = 0 self.activeSessions = 0 self.lastBaseMessageName = '' self.uniquifier = 2 self.spam_handling_option = BLOCK_SPAM self.status_switch = ON self.p2p_switch = OFF self.report_switch = ON self.lsp_switch = ON self.autoupdate_switch = True self.update_available = False self.createWorkers() self.training_queue = Queue.Queue() self.training_thread = threading.Thread(target = self.trainQueue) self.training_thread.setDaemon(True) self.training_thread.start() self.fingerprint_queue = Queue.Queue() self.fingerprint_thread = threading.Thread(target = self.fingerprintQueue) self.fingerprint_thread.setDaemon(True) self.fingerprint_thread.start() if POW: key_fn = os.path.join(resources.application_directory(), options[('globals', 'ssl_keyfile')]) certificate_fn = os.path.join(resources.application_directory(), options[('globals', 'ssl_certificate_file')]) self.rsa = POW.pemRead(POW.RSA_PRIVATE_KEY, open(key_fn).read(), 'spamexperts') self.x509 = POW.pemRead(POW.X509_CERTIFICATE, open(certificate_fn).read()) else: self.rsa = None self.x509 = None def _store_and_close_db(self, db_att): if hasattr(self, db_att): getattr(self, db_att).store() try: getattr(self, db_att).close() except OSError: pass except: None<EXCEPTION MATCH>OSError None<EXCEPTION MATCH>OSError def close(self, callback = None): if not callback: callback = lambda x: x for proxy in self.proxies: if hasattr(proxy, 'current_account') and proxy.current_account is not None and proxy.current_account.endswith('IMAP'): if options[('globals', 'verbose')]: print 'Closing', proxy.current_account proxy.terminated = True proxy.close() continue callback(60) if options[('globals', 'verbose')]: print 'Closed all IMAP proxies.' now = time.time() count = 0 while time.time() - now < 60: if not self.proxies: break callback(count) count += 1 time.sleep(1) for proxy in self.proxies: if options[('globals', 'verbose')]: print 'Forcing proxy closed', if hasattr(proxy, 'current_account'): print proxy.current_account else: print proxy.terminated = True proxy.close() callback(65) if options[('globals', 'verbose')]: print 'Closed all proxies.' ProxyClassifier.finish_processing = True ProxyClassifier.processing_thread.join() callback(70) if options[('globals', 'verbose')]: print 'Finished processing thread.' close_platform_mutex(self.platform_mutex) self.platform_mutex = None callback(80) if options[('globals', 'verbose')]: print 'Released mutex.' if hasattr(self, 'training_thread'): self.training_thread.join() callback(90) if options[('globals', 'verbose')]: print 'Finished training thread.' if self.bayes is not None: if self.bayes.nham != 0 and self.bayes.nspam != 0: state.bayes.store() try: self.bayes.close() except OSError: pass self.bayes = None for db_att in ('address_classifier', 'blocked_messages', 'delayed_messages', 'delete_messages', 'message_info_database'): self._store_and_close_db(db_att) if options[('globals', 'verbose')]: print 'Stored and closed all databases.' self.spamCorpus = None self.hamCorpus = None self.unsureCorpus = None self.spamTrainer = None self.hamTrainer = None if options[('globals', 'verbose')]: print >>sys.stderr, 'State is closed.' callback(99) def createWorkers(self): '''Using the options that were initialised in __init__ and then possibly overridden by the driver code, create the Bayes object, the Corpuses, the Trainers and so on.''' if options[('globals', 'verbose')] and options[('globals', 'verbose_level')] > 2: print >>sys.stderr, 'Setting up application data environment.' if not hasattr(self, 'DBName'): self.DBName = get_pathname_option('Storage', 'persistent_storage_file') if not hasattr(self, 'useDB'): self.useDB = options[('Storage', 'persistent_use_database')] if options[('globals', 'verbose')] and options[('globals', 'verbose_level')] > 2: print >>sys.stderr, 'Got DBName: %s and useDB: %s' % (self.DBName, self.useDB) self.bayes = open_storage(self.DBName, self.useDB) message_info_db_name = get_pathname_option('Storage', 'messageinfo_storage_file') if options[('globals', 'verbose')] and options[('globals', 'verbose_level')] > 2: print >>sys.stderr, 'messageinfodb name: %s' % (message_info_db_name,) spamexpertsConfig.msginfoDB = open_message_storage(message_info_db_name, self.useDB) self.message_info_database = spamexpertsConfig.msginfoDB SEHeaderMessage().message_info_db = spamexpertsConfig.msginfoDB fn = get_pathname_option('Storage', 'address_list_storage_file') if self.useDB == 'dbm': self.address_classifier = addressclassifier.DBDictAddressClassifier(fn) elif self.useDB == 'zodb': self.address_classifier = addressclassifier.ZODBAddressClassifier(fn) else: self.address_classifier = addressclassifier.ZODBAddressClassifier(fn) bm = get_pathname_option('Storage', 'blocked_messages_file') self.blocked_messages = TransitMessagesZODB(bm) dm = get_pathname_option('Storage', 'delayed_messages_file') self.delayed_messages = TransitMessagesZODB(dm) dd = get_pathname_option('Storage', 'delete_messages_file') self.delete_messages = TransitMessagesZODB(dd) sc = get_pathname_option('Storage', 'spam_cache') hc = get_pathname_option('Storage', 'ham_cache') uc = get_pathname_option('Storage', 'unsure_cache') wc = get_pathname_option('Storage', 'to_be_removed_cache') if options[('globals', 'verbose')]: print 'Spam cache:', sc print 'Ham cache:', hc print 'Unsure cache:', uc print 'Waiting cache:', wc map(ensureDir, [ sc, hc, uc, wc]) if self.gzipCache: factory = SEGzipFileMessageFactory() else: factory = SEFileMessageFactory() age = options[('Storage', 'cache_expiry_days')] * 24 * 60 * 60 self.spamCorpus = CarefulExpiryFileCorpus(age, self.blocked_messages, factory, sc, '*', cacheSize = 20) self.hamCorpus = CarefulExpiryFileCorpus(age, self.delayed_messages, factory, hc, '*', cacheSize = 20) self.unsureCorpus = CarefulExpiryFileCorpus(age, self.delayed_messages, factory, uc, '*', cacheSize = 20) self.waitingCorpus = FileCorpus(factory, wc, '*', cacheSize = 1) self.spamCorpus.removeExpiredMessages() self.hamCorpus.removeExpiredMessages() self.unsureCorpus.removeExpiredMessages() self.spamTrainer = SpamTrainer(self.bayes) self.hamTrainer = HamTrainer(self.bayes) self.fingerprintSpamTrainer = FingerprintSpamTrainer(self.fingerprint, self.fingerprintclient) self.fingerprintHamTrainer = FingerprintHamTrainer(self.fingerprint, self.fingerprintclient) self.spamCorpus.addObserver(self.spamTrainer) self.spamCorpus.addObserver(self.fingerprintSpamTrainer) self.hamCorpus.addObserver(self.hamTrainer) self.hamCorpus.addObserver(self.fingerprintHamTrainer) self.numSpams = len(self.spamCorpus) self.numHams = len(self.hamCorpus) self.statistics = se_stats.SEStats(options, self.message_info_database, SEHeaderMessage) for key in self.delayed_messages.classifier.db.keys(): account = self.delayed_messages[key] for msg_id in account.keys(): if msg_id in self.hamCorpus.keys() and msg_id in self.waitingCorpus.keys() or msg_id in self.unsureCorpus.keys(): continue print >>sys.stderr, 'Message', msg_id, "isn't in cache." del account[msg_id] self.delayed_messages[key] = account self.delayed_messages.store() if options[('globals', 'verbose')]: print >>sys.stderr, 'CreateWorkers finished.' def getNewMessageName(self): messageName = '%10.10d' % long(time.time()) if messageName == self.lastBaseMessageName: messageName = '%s-%d' % (messageName, self.uniquifier) self.uniquifier += 1 else: self.lastBaseMessageName = messageName self.uniquifier = 2 return messageName def checkLSPStatus(self): '''Check the status of the LSP and set the GUI setting accordingly. ''' actualstatus = self.lspcontrol.isInstalled() guistatus = spamexpertsConfig.enable_lsp if actualstatus != guistatus: spamexpertsConfig.enable_lsp = actualstatus def enableLSP(self): '''Enable the LSP.''' try: self.lspcontrol.installLSP() return True except AssertionError: return False def disableLSP(self): '''Disable the LSP.''' try: self.lspcontrol.removeLSP() return True except AssertionError: return False def isLSPEnabled(self): return self.lspcontrol.isInstalled() class _dummy_msg(object): '''Pretends to be a message, but without all the baggage.''' def __init__(self, key, corpus): self.key = lambda : key self.corpus = corpus def load(self): self.msg = self.corpus[self.key()] def __getitem__(self, key): return self.msg[key] def _storeDate(self, msg_id, corpus): msg = corpus[msg_id] msg.load() new_header = email.Utils.formatdate(time.time(), True) msg['X-SpamExperts-Date'] = new_header msg.store() corpus.cacheMessageHeaders(msg) return email.Utils.parsedate_tz(new_header) def _getIndividualHeaders(self, msg_id, corpus): headers = corpus.get_headers(self._dummy_msg(msg_id, corpus)) if not headers['X-SpamExperts-Date'] and headers[spamexpertsConfig.date_header] and headers['Delivery-Date']: pass d = headers['Date'] if d is None: date = time.time() self._storeDate(msg_id, corpus) else: time_tuple = email.Utils.parsedate_tz(d) if not time_tuple: time_tuple = self._storeDate(msg_id, corpus) try: date = email.Utils.mktime_tz(time_tuple) except OverflowError: date = 0.0 subject = self.get_header(headers['Subject']) from_ = self.get_header(headers['From']) to = self.get_header(headers['To']) realfrom = from_ from_ = email.Utils.parseaddr(from_) if not from_[0]: pass from_ = from_[1] return (subject, from_, to, realfrom, date) def get_header(header): h = [] for v, charset in email.Header.decode_header(header): try: if not charset: pass h.append(unicode(v, 'latin-1')) continue except (LookupError, UnicodeDecodeError): h.append(unicode(v, 'latin-1')) continue return ''.join(h) get_header = staticmethod(get_header) def _getMessagesHeadersFromCorpus(self, corpus, klass): '''Return all message headers from the corpus object in a dict.''' messages = { } klass = { IS_HAM: 'ham', IS_SPAM: 'spam', IS_UNSURE: 'unsure' }[klass] try: keys = corpus.keys() except AttributeError: return messages for i, msg_id in enumerate(keys): (subject, from_, to, realfrom, date) = self._getIndividualHeaders(msg_id, corpus) messages[i] = (from_, subject, date, None, to, klass, msg_id, realfrom) return messages def getSpamMessagesHeaders(self): '''Return a dict containing all headers for each message from the spam corpus.''' return self._getMessagesHeadersFromCorpus(self.spamCorpus, IS_SPAM) def getHamMessagesHeaders(self): '''Return a dict containing all headers for each message from the ham corpus.''' return self._getMessagesHeadersFromCorpus(self.hamCorpus, IS_HAM) def getUnsureMessagesHeaders(self): '''Return a dict containing all headers for each message from the unsure corpus.''' return self._getMessagesHeadersFromCorpus(self.unsureCorpus, IS_UNSURE) def _getMissingMessage(self, msgid, corpus): body = 'Sorry, SpamExperts was unable to find this message (%s) in its cache.\r\n\r\nPlease restart SpamExperts as soon as is convenient for you.' % (msgid,) try: headers = corpus.headers[msgid] headers = [ '%s:%s' % (k, v) for k, v in headers.iteritems() ] except KeyError: headers = ('Subject: Missing message.', 'From: unknown@invalid.com', 'To: unknown@invalid.com') message_text = '%s\r\n\r\n%s' % ('\r\n'.join(headers), body) msg = email.message_from_string(message_text, _class = SEFileMessage) msg.loaded = True return msg def getMessage(self, msgid, klass): '''Return a tuple of from address, to address, subject, and body of the specified message.''' corpora = { IS_HAM: self.hamCorpus, IS_SPAM: self.spamCorpus, IS_UNSURE: self.unsureCorpus } corpora_to_try = [ corpora[klass]] + corpora.values() for corpus in corpora_to_try: try: msg = corpus.get(msgid) continue except IOError: msg = self._getMissingMessage(msgid, corpus) continue else: print "Couldn't find", msgid msg = self._getMissingMessage(msgid, corpus) if msg is None: print "Couldn't find", msgid msg = self._getMissingMessage(msgid, corpus) try: msg.load() except IOError: msg = self._getMissingMessage(msgid, corpus) body = msg.as_html() if not isinstance(body, types.UnicodeType): body = unicode(body, 'latin-1') (subject, from_, to, unused, unused) = self._getIndividualHeaders(msgid, corpus) return (from_, to, subject, body) def removeMessageFromCorpus(self, msg_id, klass): corpus = { IS_HAM: self.hamCorpus, IS_SPAM: self.spamCorpus, IS_UNSURE: self.unsureCorpus }[klass] msg = corpus.get(msg_id) if msg is None: print >>sys.stderr, "Can't find message to remove", msg_id, klass return None msg.load() msg.setId(msg_id) if msg.getBlockingState()[1] == DELAYED: if options[('globals', 'verbose')]: print >>sys.stderr, "Can't remove message %s, it is still to be delivered. Moving aside." % (msg_id,) FileCorpus.removeMessage(corpus, msg, observer_flags = NO_TRAINING_FLAG) self.waitingCorpus.addMessage(msg) return None corpus.removeMessage(msg, observer_flags = NO_TRAINING_FLAG) def moveAndTrainMessages(self, msg_ids, sourceClass, destClass): '''Move (and appropriately train) a message or list of messages.''' self.training_queue.put((msg_ids, sourceClass, destClass)) def fingerprintQueue(self): '''Background updating of the fingerprint server. This method should be started up in a separate thread at init() and will run until close().''' while self.platform_mutex is not None: try: msg = self.fingerprint_queue.get_nowait() except Queue.Empty: time.sleep(1) continue self.fingerprintSpamTrainer.train(msg) def trainQueue(self): '''Thread-safe training: train messages as they arrive in the training queue. This method should be started up in a separate thread at init() and will run until close().''' while self.platform_mutex is not None: try: via_moving = False training_item = self.training_queue.get_nowait() if len(training_item) == 3: (msg_ids, sourceClass, destClass) = training_item via_moving = True elif len(training_item) == 2: (msg, isSpam) = training_item else: print >>sys.stderr, 'Incorrect training data', training_item except Queue.Empty: time.sleep(1) continue try: if via_moving: self._moveAndTrainMessages(msg_ids, sourceClass, destClass) if sourceClass == IS_HAM: was_spam = options[('Headers', 'header_ham_string')] elif sourceClass == IS_SPAM: was_spam = options[('Headers', 'header_spam_string')] else: was_spam = options[('Headers', 'header_unsure_string')] if destClass == IS_HAM: self.statistics.RecordTraining(True, old_class = was_spam) elif destClass == IS_SPAM: self.statistics.RecordTraining(False, old_class = was_spam) else: self.bayes.learn(msg.tokenize(), isSpam) self.statistics.RecordTraining(not isSpam) msg.RememberTrained(isSpam) self.bayes.store() continue except (SystemError, SystemExit, KeyboardInterrupt): raise continue except Exception: e = None print >>sys.stderr, 'Error in moving/training: %s' % (e,) if options[('globals', 'verbose')]: import traceback traceback.print_exc(None, sys.stderr) options[('globals', 'verbose')] None<EXCEPTION MATCH>Exception return None def _moveAndTrainMessages(self, msg_ids, sourceClass, destClass): sourceCorpus = { IS_HAM: self.hamCorpus, IS_SPAM: self.spamCorpus, IS_UNSURE: self.unsureCorpus }[sourceClass] destCorpus = { IS_HAM: self.hamCorpus, IS_SPAM: self.spamCorpus, IS_UNSURE: self.unsureCorpus }[destClass] if isinstance(msg_ids, types.StringType): msg_ids = (msg_ids,) for msg_id in msg_ids: if sourceClass == IS_SPAM: self.numSpams -= 1 elif sourceClass == IS_HAM: self.numHams -= 1 if destClass == IS_SPAM: self.numSpams += 1 asSpam = True elif destClass == IS_UNSURE: asSpam = None elif destClass == IS_HAM: self.numHams += 1 asSpam = False destCorpus.takeMessage(msg_id, sourceCorpus, fromCache = True) msg = destCorpus.get(msg_id) if msg is None: return None msg.RememberTrained(asSpam) (score, classification, clues) = self.reclassifier.classify_message(msg, NO_TRAINING_FLAG) msg.delHeaders() msg.addHeaders(prob = score, clues = clues) msg.store() destCorpus.cacheMessageHeaders(msg) (account, block_state) = msg.getBlockingState() new_state = None to_db = None from_db = None if sourceClass == IS_SPAM: if destClass == IS_HAM or destClass == IS_UNSURE: if block_state == BLOCKED or block_state == REMOVED: new_state = DELAYED try: msg_info = self.blocked_messages.get(account)[msg_id] except KeyError: pass old = self.blocked_messages.get(account) del old[msg_id] self.blocked_messages[account] = old self.blocked_messages.store() old = self.delayed_messages.get(account) old[msg_id] = msg_info self.delayed_messages[account] = old self.delayed_messages.store() old = self.delete_messages.get(account) try: del old[msg_id] except KeyError: pass self.delete_messages[account] = old self.delete_messages.store() elif (sourceClass == IS_HAM or sourceClass == IS_UNSURE) and destClass == IS_SPAM and block_state == DELAYED: new_state = BLOCKED try: msg_info = self.delayed_messages.get(account)[msg_id] except KeyError: msg_info = { } old = self.delayed_messages.get(account) del old[msg_id] self.delayed_messages[account] = old self.delayed_messages.store() old = self.blocked_messages.get(account) old[msg_id] = msg_info self.blocked_messages[account] = old self.blocked_messages.store() old = self.delete_messages.get(account) old[msg_id] = msg_info if options[('globals', 'verbose')]: print 'Scheduling deletion of', msg_id self.delete_messages[account] = old self.delete_messages.store() if new_state: msg.rememberBlockingState(account, new_state) continue class FingerprintTrainer(Trainer): '''Associates a fingerprint server and one or more corpora. This class is designed to be a corpus observer.''' def __init__(self, fingerprint, fingerprint_server, is_spam): self.fingerprint = fingerprint self.fingerprint_server = fingerprint_server self.is_spam = is_spam class FingerprintSpamTrainer(FingerprintTrainer): '''Fingerprint trainer for spam.''' def __init__(self, fingerprint, fingerprint_server): FingerprintTrainer.__init__(self, fingerprint, fingerprint_server, True) def _get_fingerprints(self, msg): if hasattr(msg, 'load'): msg.load() msg_fingerprints = list(self.fingerprint.get_fingerprint(msg)) return { 'fingerprints': msg_fingerprints, 'user_id': spamexpertsConfig.user_id } def train(self, msg): '''Train the fingerprint database with the message.''' mapping = self._get_fingerprints(msg) old_timeout = socket.getdefaulttimeout() try: socket.setdefaulttimeout(10) for unused in xrange(3): try: result = self.fingerprint_server.store(mapping) except socket.error: e = None print >>sys.stderr, 'Timed out updating FP.' time.sleep(0.5) continue except Exception: e = None time.sleep(0.5) continue elif result[0] != True: print >>sys.stderr, 'FP Error', result return None finally: socket.setdefaulttimeout(old_timeout) print >>sys.stderr, 'Could not update fingerprint server.', str(e) if options[('globals', 'verbose')]: import traceback traceback.print_exc(None, sys.stderr) def untrain(self, msg): '''Untrain the fingerprint database with the message.''' mapping = self._get_fingerprints(msg) old_timeout = socket.getdefaulttimeout() server = self.fingerprint_server try: socket.setdefaulttimeout(10) for unused in xrange(3): try: result = server.dec_report_count(mapping) except socket.error: e = None print >>sys.stderr, 'Timed out updating FP.' time.sleep(0.5) continue except Exception: e = None time.sleep(0.5) continue elif result[0] != True: print >>sys.stderr, 'FP Error', result return None finally: socket.setdefaulttimeout(old_timeout) print >>sys.stderr, 'Could not update fingerprint server.', str(e) if options[('globals', 'verbose')]: import traceback traceback.print_exc(None, sys.stderr) class FingerprintHamTrainer(FingerprintTrainer): '''Fingerprint trainer for ham.''' def __init__(self, fingerprint, fingerprint_server): FingerprintTrainer.__init__(self, fingerprint, fingerprint_server, False) def train(self, unused): pass def untrain(self, unused): pass state = State()